Skip to content

MultiDbClient implementation #3696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: feat/active-active
Choose a base branch
from

Conversation

vladvildanov
Copy link
Collaborator

Pull Request check-list

Please make sure to review and check all of these items:

  • Do tests and lints pass with this change?
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • Is the new or changed code fully tested?
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
  • Is there an example added to the examples folder (if applicable)?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

Added initial implementation of MultiDBClient that allows to operate on Active-Active databases

@vladvildanov vladvildanov requested a review from petyaslavova July 4, 2025 09:15
Thread-safe weighted list.
"""
def __init__(self):
self._items: List[tuple[Any, Union[int, float]]] = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the custom Number type defined in typing instead of Union[int, float]

else:
left = mid + 1

self._items.insert(left, (weight, item))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the type definition of self._items, the order of the elements should be (weight, item):

self._items: List[tuple[Any, Union[int, float]]]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice a slight inconsistency in the file regarding the order of item and weight throughout the methods in the file. For better clarity and consistency, it's best to use a single format throughout—either (weight, item) or (item, weight). Personally, I find (item, weight) more intuitive, as the item functions more like a key, with the weight acting as an associated value.


def register_listeners(self, event_listeners: Dict[Type[object], List[EventListenerInterface]]):
with self._lock:
for event in event_listeners:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic will be clearer if the event field is renamed to event_type

def update_weight(self, item, new_weight: float):
with self._lock:
"""Update weight of an item"""
old_weight = self.remove(item)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a weight property defined in the Database object as well. Shouldn't we update this one also, or is it handled somewhere else?

self._check_databases_health,
)

is_active_db = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property can be renamed to something like is_active_db_found or found_active_db - it will be easier to follow the code and understand the logic.


for existing_db, _ in self._databases:
if existing_db == database:
exists = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can break the cycle once the db is found.

exists = None

for existing_db, _ in self._databases:
if existing_db == database:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can break the cycle after db is found.

self._databases.update_weight(database, weight)

if weight > highest_weight and database.circuit.state == CBState.CLOSED:
database.state = DBState.ACTIVE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code repeats on several places and can be extracted in a helper method.

self._event_dispatcher.dispatch(OnCommandFailEvent(args, e, self.active_database.client))

# Retry until failure detector will trigger opening of circuit
return self.execute_command(*args, **options)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we risk here to have the execution blocked on the same command for too long? The circle will open after 100 failures by default. If we have a command for which just the parsing of the response fails for some reason - it will be executed a hundred times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants